-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12469: Deprecated and corrected topic metrics for consumer (KIP-1109) #18232
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the PR. Left a few comments.
// Remove deprecated metrics. | ||
metrics.removeSensor(deprecatedMetricName(partitionRecordsLagMetricName(tp))); | ||
metrics.removeSensor(deprecatedMetricName(partitionRecordsLeadMetricName(tp))); | ||
metrics.removeMetric(partitionPreferredReadReplicaMetricNameDeprecated(tp)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitionPreferredReadReplicaMetricNameDeprecated => deprecatedPartitionPreferredReadReplicaMetricName to be consistent with other names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
metricsManager.recordBytesFetched(topicName2, 1); | ||
// Another 8 metrics gets registered as deprecated metrics should be reported for topicName2. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gets => get
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
// 5 new metrics shall be registered. | ||
assertEquals(5, metrics.metrics().size() - additionalRegisteredMetricsSize); | ||
|
||
// Remove 1 topic which has deprecated metrics as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 topic => 1 partition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -181,6 +195,67 @@ void maybeUpdateAssignment(SubscriptionState subscription) { | |||
} | |||
} | |||
|
|||
@Deprecated // To be removed in Kafka 5.0 release. | |||
private void maybeRecordDeprecatedBytesFetched(String name, String topic, int bytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be useful to follow the approach used in https://github.com/apache/kafka/pull/11302/files to add "deprecated" in the description of the metric name and update the ops doc about the deprecation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the metrics with a specfic tag has been deprectaed hence it's hard to specify same metric as deprecated. Hence I have added Note
to consumer topic metrics. For consumer metrics ops need not to ne updated as ops.html uses metrics description to generate details on kafka-site, I have verified that the added Note
can be viewed on kafka-site locally. Please let me know if you think it should be handled differently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally speaking, we should always include a note explaining the deprecation. This should include the first version it was deprecated, when it's expected to be removed and what should be used instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, so currently below Note
has been added to metrics.
Current:
"Note: For topic names with periods (.), an additional metric with underscores is emitted.
However, the periods replaced metric is deprecated.
Please use the metric with actual topic name instead."
Example:
Should we change it to:
"Note: For topic names with periods (.), an additional metric with underscores is emitted.
However, the periods replaced metric is deprecated since Kafka 4.1 and shall be removed in Kafka 5.0.
Please use the metric with actual topic name instead."
wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 thanks for this patch!
.build(); | ||
maybeRecordDeprecatedPartitionLag(name, tp, lag); | ||
|
||
Sensor recordsLag = new SensorBuilder(metrics, name, () -> Map.of("topic", tp.topic(), "partition", String.valueOf(tp.partition()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, I earlier considered it but didn't do as previous tags were not ordered. I have added the change.
metricsManager.recordPartitionLag(tp3, 4); | ||
metricsManager.recordPartitionLead(tp3, 2); | ||
|
||
int additionalRegisteredMetricsSize = metrics.metrics().size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be sensible to assert the number of additional registered metrics too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't asserted them on purpose as above tests already validated them.
metricsManager.maybeUpdateAssignment(subscriptionState); | ||
// For tp2, 14 metrics will be unregistered. 3 for partition lag, 3 for partition lead, 1 for | ||
// preferred read replica and similarly 7 deprecated metrics. | ||
assertEquals(-9, metrics.metrics().size() - additionalRegisteredMetricsSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This -9 is needlessly obscure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a comment and toggled the check, please let me know if it's better now.
|
||
import static org.apache.kafka.clients.consumer.internals.FetchMetricsManager.topicPartitionTags; | ||
import static org.apache.kafka.clients.consumer.internals.FetchMetricsManager.topicTags; | ||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertTrue; | ||
|
||
public class FetchMetricsManagerTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I would put @SuppressWarnings("deprecation")
on this class because it is knowingly using deprecated methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a suspicion you put the suppression on FetchMetricsManager
when I meant to put it on this test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have corrected it, also I have placed the suppression on individual tests so they can be removed while removing deprecated code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the updated PR. One more comment. Also, is the test failure related to this PR?
.build(); | ||
maybeRecordDeprecatedPartitionLag(name, tp, lag); | ||
|
||
Sensor recordsLag = new SensorBuilder(metrics, name, () -> mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition())))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we do the same for the deprecated metrics too for better consistency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why you're still using the Utils.mkMap
methods instead of Map.of
from Java 9+?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma please see my previous comment #18232 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, if we want insertion ordering to be preserved, we should change SensorBuilder
to use a LinkedHashMap
(SequencedMap
would be better, but that requires Java 21). Having this implicit requirement and hoping people get it right is very error prone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah agree a explicit requirement to send an ordered map would be better in builder itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have opened https://issues.apache.org/jira/browse/KAFKA-18390 to address @ijuma's comment. The fix is targeted for version 4.1.0, as it may introduce significant changes to the codebase.
Also, is the test failure related to this PR? It seems to be build scan publish failure. I ll keep an eye on current ongoing build to see if it passes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the updated PR. LGTM. I will wait to see if other reviewers have more comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
} | ||
} | ||
|
||
@Deprecated // To be removed in Kafka 5.0 release. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please open the jira to ensure we will remove them from 5.0 :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AndrewJSchofield Please let us know if it's good to merge? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one final comment to do with suppression of deprecation warnings. Apart from that, this looks good to me.
/** | ||
* The {@link FetchMetricsManager} class provides wrapper methods to record lag, lead, latency, and fetch metrics. | ||
* It keeps an internal ID of the assigned set of partitions which is updated to ensure the set of metrics it | ||
* records matches up with the topic-partitions in use. | ||
*/ | ||
@SuppressWarnings("deprecation") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe this suppression has any value because it's implementing deprecated code, as opposed to using it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you are right, I have corrected it.
|
||
import static org.apache.kafka.clients.consumer.internals.FetchMetricsManager.topicPartitionTags; | ||
import static org.apache.kafka.clients.consumer.internals.FetchMetricsManager.topicTags; | ||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertTrue; | ||
|
||
public class FetchMetricsManagerTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a suspicion you put the suppression on FetchMetricsManager
when I meant to put it on this test.
return this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags); | ||
} | ||
|
||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should never deprecate something without including a @deprecated javadoc with the details I outlined in a separate comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I have added details here #18232 (comment), please let me know your thoughts.
…-1109) (apache#18232) The PR implements the behaviour defined in KIP-1109. It corrects the consumer topic and topic-partition metrics, while deprecating the incorrect ones. Reviewers: Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>, Andrew Schofield <[email protected]>
…-1109) (apache#18232) The PR implements the behaviour defined in KIP-1109. It corrects the consumer topic and topic-partition metrics, while deprecating the incorrect ones. Reviewers: Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>, Andrew Schofield <[email protected]>
The PR implements the behaviour defined in KIP-1109. It corrects the consumer topic and topic-partition metrics, while deprecating the incorrect ones.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1109%3A+Unifying+Kafka+Consumer+Topic+Metrics
Committer Checklist (excluded from commit message)